package com.tanx.cqrs.infrastructure.spring.event;

import com.rabbitmq.client.*;
import com.tanx.cqrs.event.RoutingKeyGenerator;
import com.tanx.cqrs.event.handler.EventHandlerResolver;
import com.tanx.cqrs.infrastructure.spring.event.store.EventStoreRepositoryFactory;
import com.tanx.cqrs.saga.SagaHandlerResolver;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;

import java.io.IOException;
import java.net.UnknownHostException;
import java.util.concurrent.TimeoutException;

/**
 * rabbitmq 事件总线
 */
@Slf4j
public class RabbitMqEventBus extends AbstractEventBus implements Runnable, Consumer {
    private static final String DEFAULT_EXCHANGE_NAME = "cqrsExchange";
    private final String exchange;
    private final String queqe;
    private RoutingKeyGenerator generator;
    private ConnectionFactory connectionFactory;
    private RabbitMqSetting mqSetting;
    private Connection connection;
    private Channel channel;

    public RabbitMqEventBus(EventHandlerResolver resolver, SagaHandlerResolver sagaHandlerResolver,
                            RoutingKeyGenerator generator, RabbitMqSetting mqSetting,
                            EventStoreRepositoryFactory factory) {
        super(resolver, sagaHandlerResolver, factory);
        this.generator = generator;
        this.mqSetting = mqSetting;
        this.connectionFactory = createConnectionFactory();
        try {
            this.connection = connectionFactory.newConnection();
            this.channel = connection.createChannel();
            this.queqe = getQueueName();
            this.exchange = getExchange();
            channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC, true, false, null);
            channel.queueDeclare(queqe, true, false, false, null);
            channel.queueBind(queqe, exchange, mqSetting.getRoutingKey());
        } catch (IOException | TimeoutException e) {
            throw new RuntimeException(e);
        }
    }

    private ConnectionFactory createConnectionFactory() {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(mqSetting.getHost());
        factory.setPort(mqSetting.getPort());
        if (!StringUtils.isEmpty(mqSetting.getUsername())) {
            factory.setUsername(mqSetting.getUsername());
            factory.setPassword(mqSetting.getPassword());
        }
        return factory;
    }

    @Override
    public void receiveEvent() {
        new Thread(this).start();
    }

    @Override
    protected void sendEvent(EventProxy eventProxy) {
        try {
            String routingKey = getRoutingKey(eventProxy);
            channel.basicPublish(exchange, routingKey, null, getEventByte(eventProxy));
            log.info("发送消息:{},routingKey:{}", eventProxy, routingKey);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private String getRoutingKey(EventProxy eventProxy) {
        return generator.generate(eventProxy.getEvent());
    }

    private String getExchange() {
        return StringUtils.isEmpty(mqSetting.getExchangeName()) ? DEFAULT_EXCHANGE_NAME : mqSetting.getExchangeName();
    }

    /**
     * 获取系统名称
     */
    public String getQueueName() {
        try {
            return java.net.InetAddress.getLocalHost().getHostName();
        } catch (UnknownHostException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void handleConsumeOk(String consumerTag) {
        log.info("handleConsumeOk {}", consumerTag);
    }

    @Override
    public void handleCancelOk(String consumerTag) {
        log.info("handleCancelOk {}", consumerTag);
    }

    @Override
    public void handleCancel(String consumerTag) throws IOException {
        log.info("handleCancel {}", consumerTag);
    }

    @Override
    public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
        log.info("handleShutdownSignal {}", consumerTag);
    }

    @Override
    public void handleRecoverOk(String consumerTag) {
        log.info("handleRecoverOk {}", consumerTag);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        log.info("handleDelivery {}", consumerTag);
        EventProxy event = parseEventFormByte(body);
        log.info("收到消息{}", event);
        consumeEvent(event);
        channel.basicAck(envelope.getDeliveryTag(), false);
    }

    @Override
    public void run() {
        try {
            channel.basicQos(mqSetting.getBasicQos());
            channel.basicConsume(queqe, false, this);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
